package com.xiaofan.java;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * \s+  匹配任意空白字符
 * 注意lambda表达式的使用 .returns(WordWithCount.class)
 * https://blog.csdn.net/jsjsjs1789/article/details/104903409/
 * nc -lk 9999
 */
public class SocketWindowWordCount_A0002 {
    public static void main(String[] args) throws Exception {

        final String hostname;
        final int port;

        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                    "type the input text into the command line");
            return;
        }

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.socketTextStream(hostname, port, "\n");

        // 函数式接口
        DataStream<WordWithCount> windowCounts = text
                .flatMap((FlatMapFunction<String, WordWithCount>) (value, out) -> {
                    String[] tokens = value.split("\\s");

                    for (String token : tokens) {
                        if (token.length() > 0) {
                            out.collect(new WordWithCount(token, 1));
                        }
                    }
                }).returns(WordWithCount.class)
                .keyBy("word")
                .timeWindow(Time.seconds(5))
                .reduce((ReduceFunction<WordWithCount>) (value1, value2) -> new WordWithCount(value1.word, value1.count + value2.count));



        // 单线程打印数据
        windowCounts.print().setParallelism(1);

        env.execute("SocketWindowWorldCount_A0002");

    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WordWithCount {
        public String word;
        public long count;

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}
